Lambda でリソースベースポリシーのプリンシパルを監視してみた

Lambda でリソースベースポリシーのプリンシパルを監視してみた

はじめに

こんにちは、さすけです!
以前、名古屋の勉強会 に登壇させていただいた際に、リソースベースポリシーのプリンシパルが自動的に変更されることがあるという話をしました(詳しくは下で説明します)。
その際、質問をいただきましたので検証をおこなってみました。少し長くなりますが、お付き合いください!

プリンシパルが自動で変わる!?

まず、先述しておりました「プリンシパルが自動で変わる」というのは、どういう状況なのでしょうか?
これは「プリンシパル が AIDA あるいは AROA から始まる文字列に置き換わる」という事象です。

AWS の re:Post には下記のような記載がありました。

==== 抜粋 ====
リソースベースのポリシーの一意のプリンシパル ID は、IAM ユーザーまたはロールが削除されたことを示します。AWS が有効な ARN にマッピングし直すことができないため、プリンシパル ID が表示されます。
==== 抜粋 ====

ようは、プリンシパルに設定していた IAM ユーザーあるいは IAM ロールが削除されてしまったことが原因のようですね。IAM ユーザーが削除された場合には AIDA、IAM ロールが削除された場合には AROA に置き換わります。
実際に置き換わったポリシーは以下のようになります。

"Principal": {
                "AWS": [
                    "AIDAxxxxxxxxxxxxxxxxx",
                    "AROAxxxxxxxxxxxxxxxxx"
                ]
            }

これをそのままにしておくと、ポリシーの変更を保存できなかったりするので、プリンシパルから削除するか有効な ARN に置き換えることが望ましいと言えます。

いただいた質問

質問の内容は「AIDA や AROA に置き換わったことを検知する方法はありますか?」という内容でした。
検証したことがなかったので、その場ではっきりとした回答ができませんでした(すみません、、)。

結論から申し上げますと 可能です
ただ、少し手間がかかりますのであらかじめご了承ください。

検証

検知および通知に使用するサービスは、Lambda、EventBridge、SNS です。
概要としては、EventBridge をトリガーとした Lambda を作成し、スケジュール実行(今回の検証では週に 1 回)させるというものになります。

検証用リソースベースポリシーとして、S3 バケットのバケットポリシーを使用しますので、S3 バケットも後々作成します。

SNS トピックの作成

まずは、メール通知用の SNS トピックを作成します。
トピック名は InvalidPrincipalNotification とします。

SNS トピックの作成手順は下記ブログがわかりやすくまとまっておりますので、ご参照ください。

https://dev.classmethod.jp/articles/intor-subscribe-sns-with-email/

Lambda 用 IAM ロールの作成

次に、Lambda 用に LambdaResourcePolicyMonitorRole という名前で IAM ロールを作成します。以下ポリシーを割り当てます。
今回の検証では、S3 のリソースベースポリシーであるバケットポリシーを監視してみるので、s3:ListAllMyBuckets s3:GetBucketPolicy を付与しています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:ListRoles",
                "iam:GetRole",
                "iam:ListRolePolicies",
                "iam:GetRolePolicy"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListAllMyBuckets",
                "s3:GetBucketPolicy"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:InvalidPrincipalNotification"
        }
    ]
}

実際の運用では、管理を容易にするために AWS マネージドポリシーをアタッチするのが良いかと思いますが、今回の検証ではパッと見のわかりやすさから、1 つのカスタマー管理ポリシーにまとめております。

Lambda 関数の作成

Python の Lambda 関数 を TestDeletePrincipal という名前で作成します。今回は Python3.13 を使用しております。
関数作成時に、先ほど作成した IAM ロール LambdaResourcePolicyMonitorRole を選択してください。

Lambda では、AIDA あるいは AROA から始まるプリンシパルを検出し、検出されたコンソール URL を添付したメールを送信するようにします。

実際のコードは「Lambda コードを見る」をクリックしてです(少々長いのでトグルに格納しています)。
また、あらかじめ Lambda の環境変数に先ほど作成した SNS トピック の ARN を SNS_TOPIC_ARN として設定しておきましょう。

Lambda コードを見る
import boto3
import json
import os
from botocore.exceptions import ClientError

# AWSクライアントの初期化
s3 = boto3.client('s3')
sns = boto3.client('sns')
sqs = boto3.client('sqs')
lambda_client = boto3.client('lambda')

SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
BUCKETS_TO_MONITOR = os.environ.get('BUCKETS_TO_MONITOR', '').split(',')

# オプション: リージョンの指定が必要な場合
AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1')  # 例: 'ap-northeast-1'

def get_s3_buckets():
    try:
        response = s3.list_buckets()
        return [bucket['Name'] for bucket in response['Buckets']]
    except ClientError as e:
        print(f"Error listing S3 buckets: {e}")
        return []

def get_s3_bucket_policies(buckets):
    policies = {}
    for bucket in buckets:
        try:
            policy = s3.get_bucket_policy(Bucket=bucket)
            policies[bucket] = json.loads(policy['Policy'])
        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code == 'NoSuchBucketPolicy':
                policies[bucket] = {}
            elif error_code == 'NoSuchBucket':
                print(f"バケット '{bucket}' が存在しません。")
                policies[bucket] = {}
            else:
                print(f"バケット '{bucket}' のポリシー取得中にエラーが発生しました: {e}")
                policies[bucket] = {}
        except Exception as e:
            print(f"バケット '{bucket}' のポリシー取得中に予期しないエラーが発生しました: {e}")
            policies[bucket] = {}
    return policies

def get_sns_policies():
    policies = {}
    try:
        response = sns.list_topics()
        topics = response.get('Topics', [])
        for topic in topics:
            topic_arn = topic['TopicArn']
            try:
                policy = sns.get_topic_attributes(TopicArn=topic_arn)['Attributes'].get('Policy', '{}')
                policies[topic_arn] = json.loads(policy)
            except ClientError as e:
                error_code = e.response['Error']['Code']
                if error_code == 'NotFound':
                    policies[topic_arn] = {}
                else:
                    print(f"SNSトピック '{topic_arn}' のポリシー取得中にエラーが発生しました: {e}")
                    policies[topic_arn] = {}
            except Exception as e:
                print(f"SNSトピック '{topic_arn}' のポリシー取得中に予期しないエラーが発生しました: {e}")
                policies[topic_arn] = {}
    except ClientError as e:
        print(f"Error listing SNS topics: {e}")
    return policies

def get_sqs_policies():
    policies = {}
    try:
        response = sqs.list_queues()
        queues = response.get('QueueUrls', [])
        for queue_url in queues:
            try:
                attributes = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['Policy'])['Attributes']
                policy = attributes.get('Policy', '{}')
                policies[queue_url] = json.loads(policy)
            except ClientError as e:
                error_code = e.response['Error']['Code']
                if error_code == 'AWS.SimpleQueueService.NonExistentQueue':
                    print(f"キュー '{queue_url}' が存在しません。")
                    policies[queue_url] = {}
                else:
                    print(f"SQSキュー '{queue_url}' のポリシー取得中にエラーが発生しました: {e}")
                    policies[queue_url] = {}
            except Exception as e:
                print(f"SQSキュー '{queue_url}' のポリシー取得中に予期しないエラーが発生しました: {e}")
                policies[queue_url] = {}
    except ClientError as e:
        print(f"Error listing SQS queues: {e}")
    return policies

def get_lambda_policies():
    policies = {}
    try:
        response = lambda_client.list_functions()
        functions = response.get('Functions', [])
        for function in functions:
            function_name = function['FunctionName']
            try:
                policy_response = lambda_client.get_policy(FunctionName=function_name)
                policy = json.loads(policy_response['Policy'])
                policies[function_name] = policy
            except ClientError as e:
                error_code = e.response['Error']['Code']
                if error_code == 'ResourceNotFoundException':
                    policies[function_name] = {}
                else:
                    print(f"Lambda関数 '{function_name}' のポリシー取得中にエラーが発生しました: {e}")
                    policies[function_name] = {}
            except Exception as e:
                print(f"Lambda関数 '{function_name}' のポリシー取得中に予期しないエラーが発生しました: {e}")
                policies[function_name] = {}
    except ClientError as e:
        print(f"Error listing Lambda functions: {e}")
    return policies

def check_principal_arns(policy_dict, resource_type):
    problematic_resources = []
    for resource, policy in policy_dict.items():
        if 'Statement' in policy:
            statements = policy['Statement']
            if not isinstance(statements, list):
                statements = [statements]
            for stmt in statements:
                principals = []
                if 'Principal' in stmt:
                    principal = stmt['Principal']
                    if isinstance(principal, dict):
                        for key, value in principal.items():
                            if isinstance(value, str):
                                principals.append(value)
                            elif isinstance(value, list):
                                principals.extend(value)
                    elif isinstance(principal, str):
                        principals.append(principal)
                for principal_arn in principals:
                    if isinstance(principal_arn, str) and (principal_arn.startswith('AROA') or principal_arn.startswith('AIDA')):
                        # 管理コンソールURLの生成
                        if resource_type == 's3':
                            url = f"https://s3.console.aws.amazon.com/s3/buckets/{resource}/permissions?tab=permissions"
                        elif resource_type == 'sns':
                            url = f"https://console.aws.amazon.com/sns/v3/home?region={AWS_REGION}#/topics/{resource}/permissions"
                        elif resource_type == 'sqs':
                            url = f"https://console.aws.amazon.com/sqs/v2/home?region={AWS_REGION}#/queue/{resource}/permissions"
                        elif resource_type == 'lambda':
                            url = f"https://console.aws.amazon.com/lambda/home?region={AWS_REGION}#/functions/{resource}/permissions"
                        else:
                            url = ""
                        problematic_resources.append({
                            'ResourceType': resource_type.upper(),
                            'Resource': resource,
                            'PrincipalARN': principal_arn,
                            'Policy': policy,
                            'URL': url
                        })
    return problematic_resources

def lambda_handler(event, context):
    all_problems = []

    # 監視対象のバケットを取得
    if BUCKETS_TO_MONITOR and BUCKETS_TO_MONITOR != ['']:
        s3_buckets = BUCKETS_TO_MONITOR
    else:
        s3_buckets = get_s3_buckets()

    # S3ポリシー取得とチェック
    s3_policies = get_s3_bucket_policies(s3_buckets)
    s3_problems = check_principal_arns(s3_policies, 's3')
    all_problems.extend(s3_problems)

    # SNSポリシー取得とチェック
    sns_policies = get_sns_policies()
    sns_problems = check_principal_arns(sns_policies, 'sns')
    all_problems.extend(sns_problems)

    # SQSポリシー取得とチェック
    sqs_policies = get_sqs_policies()
    sqs_problems = check_principal_arns(sqs_policies, 'sqs')
    all_problems.extend(sqs_problems)

    # Lambdaポリシー取得とチェック
    lambda_policies = get_lambda_policies()
    lambda_problems = check_principal_arns(lambda_policies, 'lambda')
    all_problems.extend(lambda_problems)

    if all_problems:
        message = "以下のリソースでプリンシパルに'AROA'または'AIDA'で始まるARNが検出されました。\n\n"
        for problem in all_problems:
            message += (
                f"リソースタイプ: {problem['ResourceType']}\n"
                f"リソース名: {problem['Resource']}\n"
                f"プリンシパルARN: {problem['PrincipalARN']}\n"
                f"ポリシー: {json.dumps(problem['Policy'], indent=2)}\n"
                f"管理コンソールURL: {problem['URL']}\n\n"
            )

        # SNSで通知
        try:
            sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Subject='リソースベースポリシーのプリンシパルにAROA/AIDAが検出されました',
                Message=message
            )
            print(f"{len(all_problems)} 件の問題を検出し、通知を送信しました。")
        except ClientError as e:
            print(f"SNSへの通知送信中にエラーが発生しました: {e}")

        return {
            'statusCode': 200,
            'body': json.dumps(f"{len(all_problems)} 件の問題を検出し、通知を送信しました。")
        }
    else:
        print("問題は検出されませんでした。")
        return {
            'statusCode': 200,
            'body': json.dumps('問題は検出されませんでした。')
        }

上記コードをデプロイしたら、テストしてみましょう。
テストタブのイベント JSON に {} と記述しテストを実行してみてください。

EventBridge ルールを作成

EventBridge ルールを作成しましょう。
Lambda の「トリガーを追加」から EventBridge を選択し、下記ドキュメントを参考にお好みのスケジュールを作成します。

https://docs.aws.amazon.com/ja_jp/eventbridge/latest/userguide/eb-scheduled-rule-pattern.html

今回は下記のように設定しました。
毎週水曜日の 8:30(UTC) にイベントスケジュールを作成しています。

cron(30 8 ? * WED *)

検証用のバケットと IAM ユーザーおよび IAM ロール作成

最後に、検証用のバケットと IAM ユーザーおよび IAM ロールを作成しましょう。
それぞれ以下のような名称で作成します。

S3 バケット:testdeleteprincipal
IAM ユーザー:TestDeletePrincipalIAMUser
IAM ロール:TestDeletePrincipalIAMRole

そして、バケットポリシーを以下のように設定しましょう。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowTestUserAccess",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::xxxxxxxxxxxx:user/TestDeletePrincipalIAMUser",
                    "arn:aws:iam::xxxxxxxxxxxx:role/TestDeletePrincipalIAMRole"
                ]
            },
            "Action": "s3:*",
            "Resource": "arn:aws:s3:::testdeleteprincipal/*"
        }
    ]
}

バケットポリシーを設定後、IAM ユーザーあるいは IAM ロールを削除しておきます。
今回はIAM ユーザー TestDeletePrincipalIAMUser を削除しておきました。

検証結果

設定お疲れ様でした!
さて、実際にスケジュールした時刻を待って(あるいは Lambda のテスト実行)により、関数を動かしてみましょう。

すると以下のようなメールが指定したメールアドレスに届きます。

Photokako-mosaic-d0F60xTvdJR8C0Kb

実際に当該バケット testdeleteprincipal のバケットポリシーをのぞいてみると以下のようになっていました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowTestUserAccess",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "AIDAxxxxxxxxxxxxxxxxx",
                    "arn:aws:iam::xxxxxxxxxxxx:role/TestDeletePrincipalIAMRole"
                ]
            },
            "Action": "s3:*",
            "Resource": "arn:aws:s3:::testdeleteprincipal/*"
        }
    ]
}

通知メール同様、削除された IAM ユーザーを指定していたプリンシパルが AIDA から始まる文字列に置き換わっていますね!

まとめ

お疲れ様でした!長くなりましたが、今回の検証は終わりです!
無効となり、自動的に置き換わったプリンシパルを検知およびメール通知させるにはなかなか手間がかかりましたね。プリンシパルが置き換わってしまうのは AWS 側の変更なので CloudTrail には記録されず検知するには少し工夫が必要ですね。

ただ、大量のリソースの中から人力で探し出すよりは遥かに楽になるかと思います。
気になった方は試してみると良いかもしれません!

参考文献

アノテーション株式会社

アノテーション株式会社は、クラスメソッド社のグループ企業として「オペレーション・エクセレンス」を担える企業を目指してチャレンジを続けています。「らしく働く、らしく生きる」のスローガンを掲げ、様々な背景をもつ多様なメンバーが自由度の高い働き方を通してお客様へサービスを提供し続けてきました。現在当社では一緒に会社を盛り上げていただけるメンバーを募集中です。少しでもご興味あれば、アノテーション株式会社WEBサイトをご覧ください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.